package Util;

import com.mongodb.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.spark.streaming.kafka010.OffsetRange;

import java.util.HashMap;
import java.util.Map;

/**
 * Created by hy on 2017/01/17 0017.
 */
public class DBOffsets {
    private static DBCollection _offsetsCollection = MongoUtil.instance.getDBCollection(Const.Mongo_DB, Const.SPRAK_KAFKA_COLLECTION);

    public static void SaveDBOffset(String broker, String consumerGroup, OffsetRange item) {
        //search query
        BasicDBObject searchQuery = new BasicDBObject();
        BasicDBObject updateFiled = new BasicDBObject();
        searchQuery.append("consumerGroup", consumerGroup)
                .append("leader", broker)
                .append("topic", item.topic())
                .append("partition", item.partition())
                .append("untilOffset", item.fromOffset());
        //newDocument
        updateFiled.append("fromOffset", item.fromOffset())
                .append("untilOffset", item.untilOffset())
                .append("createOrUpdateTime", Utils.timeStamp())
                .append("leader", Const.BROKER_HOST)
                .append("consumerGroup", consumerGroup);
        BasicDBObject newDocument = new BasicDBObject("$set", updateFiled);
        //update or insert
        _offsetsCollection.update(searchQuery, newDocument, true, false);
    }

    public static Map<TopicPartition, Long> GetDBConsumerOffset(String broker, String consumerGroup, String topic, int partition) {
        Map<TopicPartition, Long> dbOffset = new HashMap<TopicPartition, Long>();
        int consumerOffset = 0;
        BasicDBObject queryObject = new BasicDBObject()
                .append(QueryOperators.AND, new BasicDBObject[]{
                        new BasicDBObject("leader", broker),
                        new BasicDBObject("consumerGroup", consumerGroup),
                        new BasicDBObject("topic", topic),
                        new BasicDBObject("partition", partition)
                });
        DBCursor dbCursor = _offsetsCollection.find(queryObject).limit(1).sort(new BasicDBObject("untilOffset", -1));
        if (dbCursor == null || dbCursor.count() == 0) {
            return null;
        } else {
            DBObject currentObj;
            while (dbCursor.hasNext()) {
                currentObj = dbCursor.next();
                dbOffset.put(new TopicPartition(currentObj.get("topic").toString(), Integer.parseInt(currentObj.get("partition").toString())), Long.parseLong(currentObj.get("untilOffset").toString()));
            }
        }

        return dbOffset;
    }

}
